
package org.apache.solr.cloud;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClosableThread;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.HashPartitioner;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Cluster leader. Responsible node assignments, cluster state file?
 */
public class Overseer {

    public static final String QUEUE_OPERATION = "operation";
    private static final int STATE_UPDATE_DELAY = 500;  // delay between cloud state updates
    private static Logger log = LoggerFactory.getLogger(Overseer.class);

    private class ClusterStateUpdater implements Runnable, ClosableThread {

        private static final String DELETECORE = "deletecore";
        private final ZkStateReader reader;
        private final SolrZkClient zkClient;
        private final String myId;
        //queue where everybody can throw tasks
        private final DistributedQueue stateUpdateQueue;
        //Internal queue where overseer stores events that have not yet been published into cloudstate
        //If Overseer dies while extracting the main queue a new overseer will start from this queue 
        private final DistributedQueue workQueue;
        private volatile boolean isClosed;

        public ClusterStateUpdater(final ZkStateReader reader, final String myId) {
            this.zkClient = reader.getZkClient();
            this.stateUpdateQueue = getInQueue(zkClient);
            this.workQueue = getInternalQueue(zkClient);
            this.myId = myId;
            this.reader = reader;
        }

        @Override
        public void run() {

            if(!this.isClosed && amILeader()) {
                // see if there's something left from the previous Overseer and re
                // process all events that were not persisted into cloud state
                synchronized(reader.getUpdateLock()) { //XXX this only protects against edits inside single node
                    try {
                        byte[] head = workQueue.peek();

                        if(head != null) {
                            reader.updateClusterState(true);
                            ClusterState clusterState = reader.getClusterState();
                            log.info("Replaying operations from work queue.");

                            while(head != null && amILeader()) {
                                final ZkNodeProps message = ZkNodeProps.load(head);
                                final String operation = message.getStr(QUEUE_OPERATION);
                                clusterState = processMessage(clusterState, message, operation);
                                zkClient.setData(ZkStateReader.CLUSTER_STATE, ZkStateReader.toJSON(clusterState), true);
                                workQueue.remove();
                                head = workQueue.peek();
                            }
                        }
                    }
                    catch(KeeperException e) {
                        if(e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
                            log.warn("Solr cannot talk to ZK");
                            return;
                        }
                        SolrException.log(log, "", e);
                        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
                    }
                    catch(InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
            }

            log.info("Starting to work on the main queue");
            while(!this.isClosed && amILeader()) {
                synchronized(reader.getUpdateLock()) {
                    try {
                        byte[] head = stateUpdateQueue.peek();

                        if(head != null) {
                            reader.updateClusterState(true);
                            ClusterState clusterState = reader.getClusterState();

                            while(head != null) {
                                final ZkNodeProps message = ZkNodeProps.load(head);
                                final String operation = message.getStr(QUEUE_OPERATION);

                                clusterState = processMessage(clusterState, message, operation);
                                workQueue.offer(head);
                                stateUpdateQueue.remove();
                                head = stateUpdateQueue.peek();
                            }
                            zkClient.setData(ZkStateReader.CLUSTER_STATE, ZkStateReader.toJSON(clusterState), true);
                        }
                        // clean work queue
                        while(workQueue.poll() != null);
                    }
                    catch(KeeperException e) {
                        if(e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
                            log.warn("Overseer cannot talk to ZK");
                            return;
                        }
                        SolrException.log(log, "", e);
                        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
                    }
                    catch(InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                }

                try {
                    Thread.sleep(STATE_UPDATE_DELAY);
                }
                catch(InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }

        private ClusterState processMessage(ClusterState clusterState, final ZkNodeProps message, final String operation) {

            switch(operation) {
                case "state":
                    clusterState = updateState(clusterState, message);
                    break;
                case DELETECORE:
                    clusterState = removeCore(clusterState, message);
                    break;
                case ZkStateReader.LEADER_PROP:
                    StringBuilder sb = new StringBuilder();
                    String baseUrl = message.getStr(ZkStateReader.BASE_URL_PROP);
                    String coreName = message.getStr(ZkStateReader.CORE_NAME_PROP);
                    sb.append(baseUrl);
                    if(baseUrl != null && !baseUrl.endsWith("/")) {
                        sb.append("/");
                    }
                    sb.append(coreName == null ? "" : coreName);
                    if(!(sb.substring(sb.length() - 1).equals("/"))) {
                        sb.append("/");
                    }
                    clusterState = setShardLeader(clusterState,
                            message.getStr(ZkStateReader.COLLECTION_PROP),
                            message.getStr(ZkStateReader.SHARD_ID_PROP),
                            sb.length() > 0 ? sb.toString() : null);
                    break;
                default:
                    throw new RuntimeException("unknown operation:" + operation + " contents:" + message.getProperties());
            }
            return clusterState;
        }

        private boolean amILeader() {

            try {
                ZkNodeProps props = ZkNodeProps.load(zkClient.getData("/overseer_elect/leader", null, null, true));
                if(myId.equals(props.getStr("id"))) {
                    return true;
                }
            }
            catch(KeeperException e) {
                log.warn("", e);
            }
            catch(InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            log.info("According to ZK I (id=" + myId + ") am no longer a leader.");

            return false;
        }

        /**
         * Try to assign core to the cluster.
         */
        private ClusterState updateState(ClusterState state, final ZkNodeProps message) {

            final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
            final String zkCoreNodeName = message.getStr(ZkStateReader.NODE_NAME_PROP) + "_" + message.getStr(ZkStateReader.CORE_NAME_PROP);
            final Integer numShards = message.getStr(ZkStateReader.NUM_SHARDS_PROP) != null ? Integer.parseInt(message.getStr(ZkStateReader.NUM_SHARDS_PROP)) : null;

            //collection does not yet exist, create placeholders if num shards is specified
            if(!state.getCollections().contains(collection) && numShards != null) {
                state = createCollection(state, collection, numShards);
            }

            // use the provided non null shardId
            String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
            if(sliceName == null) {
                String nodeName = message.getStr(ZkStateReader.NODE_NAME_PROP);
                //get shardId from ClusterState
                sliceName = getAssignedId(state, nodeName, message);
            }
            if(sliceName == null) {
                //request new shardId 
                sliceName = AssignShard.assignShard(collection, state, numShards);
            }

            Slice slice = state.getSlice(collection, sliceName);
            Map<String, Object> replicaProps = new LinkedHashMap<>();

            replicaProps.putAll(message.getProperties());
            // System.out.println("########## UPDATE MESSAGE: " + JSONUtil.toJSON(message));
            if(slice != null) {
                Replica oldReplica = slice.getReplicasMap().get(zkCoreNodeName);
                if(oldReplica != null && oldReplica.containsKey(ZkStateReader.LEADER_PROP)) {
                    replicaProps.put(ZkStateReader.LEADER_PROP, oldReplica.get(ZkStateReader.LEADER_PROP));
                }
            }

            // we don't put num_shards in the clusterstate
            replicaProps.remove(ZkStateReader.NUM_SHARDS_PROP);
            replicaProps.remove(QUEUE_OPERATION);

            Replica replica = new Replica(zkCoreNodeName, replicaProps);
            // TODO: where do we get slice properties in this message?  or should there be a separate create-slice message if we want that?

            Map<String, Object> sliceProps = null;
            Map<String, Replica> replicas;

            if(slice != null) {
                sliceProps = slice.getProperties();
                replicas = slice.getReplicasCopy();
            }
            else {
                replicas = new HashMap<>(1);
            }

            replicas.put(replica.getName(), replica);
            slice = new Slice(sliceName, replicas, sliceProps);

            ClusterState newClusterState = updateSlice(state, collection, slice);
            return newClusterState;
        }

        private ClusterState createCollection(ClusterState state, String collectionName, int numShards) {

            HashPartitioner hp = new HashPartitioner();
            List<HashPartitioner.Range> ranges = hp.partitionRange(numShards, hp.fullRange());

            Map<String, Map<String, Slice>> newStates = new LinkedHashMap<>();
            Map<String, Slice> newSlices = new LinkedHashMap<>();
            newStates.putAll(state.getCollectionStates());
            for(int i = 0; i < numShards; i++) {
                final String sliceName = "shard" + (i + 1);

                Map<String, Object> sliceProps = new LinkedHashMap<>(1);
                sliceProps.put(Slice.RANGE, ranges.get(i));

                newSlices.put(sliceName, new Slice(sliceName, null, sliceProps));
            }
            newStates.put(collectionName, newSlices);
            ClusterState newClusterState = new ClusterState(state.getLiveNodes(), newStates);
            return newClusterState;
        }

        /*
         * Return an already assigned id or null if not assigned
         */
        private String getAssignedId(final ClusterState state, final String nodeName, final ZkNodeProps coreState) {

            final String key = coreState.getStr(ZkStateReader.NODE_NAME_PROP) + "_" + coreState.getStr(ZkStateReader.CORE_NAME_PROP);
            Map<String, Slice> slices = state.getSlices(coreState.getStr(ZkStateReader.COLLECTION_PROP));
            if(slices != null) {
                for(Slice slice : slices.values()) {
                    if(slice.getReplicasMap().get(key) != null) {
                        return slice.getName();
                    }
                }
            }
            return null;
        }

        private ClusterState updateSlice(ClusterState state, String collection, Slice slice) {
            // System.out.println("###!!!### OLD CLUSTERSTATE: " + JSONUtil.toJSON(state.getCollectionStates()));
            // System.out.println("Updating slice:" + slice);

            Map<String, Map<String, Slice>> newCollections = new LinkedHashMap<>(state.getCollectionStates());  // make a shallow copy
            Map<String, Slice> slices = newCollections.get(collection);
            if(slices == null) {
                slices = new HashMap<>(1);
            }
            else {
                slices = new LinkedHashMap<>(slices); // make a shallow copy
            }
            slices.put(slice.getName(), slice);
            newCollections.put(collection, slices);

            // System.out.println("###!!!### NEW CLUSTERSTATE: " + JSONUtil.toJSON(newCollections));

            return new ClusterState(state.getLiveNodes(), newCollections);
        }

        private ClusterState setShardLeader(ClusterState state, String collection, String sliceName, String leaderUrl) {

            final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<>(state.getCollectionStates());

            Map<String, Slice> slices = newStates.get(collection);
            if(slices == null) {
                log.error("Could not mark shard leader for non existing collection:" + collection);
                return state;
            }

            // make a shallow copy and add it to the new collection
            slices = new LinkedHashMap<>(slices);
            newStates.put(collection, slices);

            Slice slice = slices.get(sliceName);
            if(slice == null) {
                log.error("Could not mark leader for non existing slice:" + sliceName);
                return state;
            }
            else {
                // TODO: consider just putting the leader property on the shard, not on individual replicas

                Replica oldLeader = slice.getLeader();

                final Map<String, Replica> newReplicas = new LinkedHashMap<>();
                for(Replica replica : slice.getReplicas()) {

                    // TODO: this should only be calculated once and cached somewhere?
                    String coreURL = ZkCoreNodeProps.getCoreUrl(replica.getStr(ZkStateReader.BASE_URL_PROP), replica.getStr(ZkStateReader.CORE_NAME_PROP));

                    if(replica == oldLeader && !coreURL.equals(leaderUrl)) {
                        Map<String, Object> replicaProps = new LinkedHashMap<>(replica.getProperties());
                        replicaProps.remove(Slice.LEADER);
                        replica = new Replica(replica.getName(), replicaProps);
                    }
                    else if(coreURL.equals(leaderUrl)) {
                        Map<String, Object> replicaProps = new LinkedHashMap<>(replica.getProperties());
                        replicaProps.put(Slice.LEADER, "true");  // TODO: allow booleans instead of strings
                        replica = new Replica(replica.getName(), replicaProps);
                    }
                    newReplicas.put(replica.getName(), replica);
                }

                Map<String, Object> newSliceProps = slice.shallowCopy();
                newSliceProps.put(Slice.REPLICAS, newReplicas);
                Slice newSlice = new Slice(slice.getName(), newReplicas, slice.getProperties());
                slices.put(newSlice.getName(), newSlice);
            }
            return new ClusterState(state.getLiveNodes(), newStates);
        }

        /*
         * Remove core from cloudstate
         */
        private ClusterState removeCore(final ClusterState clusterState, ZkNodeProps message) {

            final String coreNodeName = message.getStr(ZkStateReader.NODE_NAME_PROP) + "_" + message.getStr(ZkStateReader.CORE_NAME_PROP);
            final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);

            final LinkedHashMap<String, Map<String, Slice>> newStates = new LinkedHashMap<>();
            for(String collectionName : clusterState.getCollections()) {
                if(collection.equals(collectionName)) {
                    Map<String, Slice> slices = clusterState.getSlices(collection);
                    LinkedHashMap<String, Slice> newSlices = new LinkedHashMap<>();
                    for(Slice slice : slices.values()) {
                        if(slice.getReplicasMap().containsKey(coreNodeName)) {
                            Map<String, Replica> newReplicas = slice.getReplicasCopy();
                            newReplicas.remove(coreNodeName);
                            Slice newSlice = new Slice(slice.getName(), newReplicas, slice.getProperties());
                            newSlices.put(slice.getName(), newSlice);
                        }
                        else {
                            newSlices.put(slice.getName(), slice);
                        }
                    }
                    int cnt = 0;
                    for(Slice slice : newSlices.values()) {
                        cnt += slice.getReplicasMap().size();
                    }
                    // TODO: if no nodes are left after this unload
                    // remove from zk - do we have a race where Overseer
                    // see's registered nodes and publishes though?
                    if(cnt > 0) {
                        newStates.put(collectionName, newSlices);
                    }
                    else {
                        // TODO: it might be better logically to have this in ZkController
                        // but for tests (it's easier) it seems better for the moment to leave CoreContainer and/or
                        // ZkController out of the Overseer.
                        try {
                            zkClient.clean("/collections/" + collectionName);
                        }
                        catch(InterruptedException e) {
                            SolrException.log(log, "Cleaning up collection in zk was interrupted:" + collectionName, e);
                            Thread.currentThread().interrupt();
                        }
                        catch(KeeperException e) {
                            SolrException.log(log, "Problem cleaning up collection in zk:" + collectionName, e);
                        }
                    }
                }
                else {
                    newStates.put(collectionName, clusterState.getSlices(collectionName));
                }
            }
            ClusterState newState = new ClusterState(clusterState.getLiveNodes(), newStates);
            return newState;
        }

        @Override
        public void close() {
            this.isClosed = true;
        }

        @Override
        public boolean isClosed() {
            return this.isClosed;
        }
    }

    class OverseerThread extends Thread implements ClosableThread {

        private volatile boolean isClosed;

        public OverseerThread(ThreadGroup tg, ClusterStateUpdater clusterStateUpdater) {
            super(tg, clusterStateUpdater);
        }

        public OverseerThread(ThreadGroup ccTg, OverseerCollectionProcessor overseerCollectionProcessor, String string) {
            super(ccTg, overseerCollectionProcessor, string);
        }

        @Override
        public void close() {
            this.isClosed = true;
        }

        @Override
        public boolean isClosed() {
            return this.isClosed;
        }
    }
    private OverseerThread ccThread;
    private OverseerThread updaterThread;
    private volatile boolean isClosed;
    private ZkStateReader reader;
    private ShardHandler shardHandler;
    private String adminPath;

    public Overseer(ShardHandler shardHandler, String adminPath, final ZkStateReader reader) throws KeeperException, InterruptedException {
        this.reader = reader;
        this.shardHandler = shardHandler;
        this.adminPath = adminPath;
    }

    public void start(String id) {

        log.info("Overseer (id=" + id + ") starting");
        createOverseerNode(reader.getZkClient());
        //launch cluster state updater thread
        ThreadGroup tg = new ThreadGroup("Overseer state updater.");
        updaterThread = new OverseerThread(tg, new ClusterStateUpdater(reader, id));
        updaterThread.setDaemon(true);

        ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
        ccThread = new OverseerThread(ccTg, new OverseerCollectionProcessor(reader, id, shardHandler, adminPath), "Overseer-" + id);
        ccThread.setDaemon(true);

        updaterThread.start();
        ccThread.start();
    }

    public void close() {

        isClosed = true;
        if(updaterThread != null) {
            updaterThread.close();
            updaterThread.interrupt();
        }
        if(ccThread != null) {
            ccThread.close();
            ccThread.interrupt();
        }
    }

    /**
     * Get queue that can be used to send messages to Overseer.
     */
    public static DistributedQueue getInQueue(final SolrZkClient zkClient) {
        createOverseerNode(zkClient);
        return new DistributedQueue(zkClient, "/overseer/queue", null);
    }

    /* Internal queue, not to be used outside of Overseer */
    static DistributedQueue getInternalQueue(final SolrZkClient zkClient) {
        createOverseerNode(zkClient);
        return new DistributedQueue(zkClient, "/overseer/queue-work", null);
    }

    /* Collection creation queue */
    static DistributedQueue getCollectionQueue(final SolrZkClient zkClient) {
        createOverseerNode(zkClient);
        return new DistributedQueue(zkClient, "/overseer/collection-queue-work", null);
    }

    private static void createOverseerNode(final SolrZkClient zkClient) {
        try {
            zkClient.create("/overseer", new byte[0], CreateMode.PERSISTENT, true);
        }
        catch(KeeperException.NodeExistsException e) { }
        catch(InterruptedException e) {
            log.error("Could not create Overseer node", e);
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
        catch(KeeperException e) {
            log.error("Could not create Overseer node", e);
            throw new RuntimeException(e);
        }
    }
}
